home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / pyshared / problem_report.py < prev    next >
Encoding:
Python Source  |  2009-04-06  |  44.8 KB  |  1,417 lines

  1. # vim: set encoding=UTF-8 fileencoding=UTF-8 :
  2.  
  3. '''Store, load, and handle problem reports.
  4.  
  5. Copyright (C) 2006 Canonical Ltd.
  6. Author: Martin Pitt <martin.pitt@ubuntu.com>
  7.  
  8. This program is free software; you can redistribute it and/or modify it
  9. under the terms of the GNU General Public License as published by the
  10. Free Software Foundation; either version 2 of the License, or (at your
  11. option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
  12. the full text of the license.
  13. '''
  14.  
  15. import zlib, base64, time, UserDict, sys, gzip, struct
  16. from cStringIO import StringIO
  17. from email.Encoders import encode_base64
  18. from email.MIMEMultipart import MIMEMultipart
  19. from email.MIMEBase import MIMEBase
  20. from email.MIMEText import MIMEText
  21.  
  22. class CompressedValue:
  23.     '''Represent a ProblemReport value which is gzip compressed.'''
  24.  
  25.     def __init__(self, value=None, name=None):
  26.         '''Initialize an empty CompressedValue object with an optional name.'''
  27.         
  28.         self.gzipvalue = None
  29.         self.name = name
  30.         # By default, compressed values are in gzip format. Earlier versions of
  31.         # problem_report used zlib format (without gzip header). If you have such
  32.         # a case, set legacy_zlib to True.
  33.         self.legacy_zlib = False
  34.  
  35.         if value:
  36.             self.set_value(value)
  37.  
  38.     def set_value(self, value):
  39.         '''Set uncompressed value.'''
  40.  
  41.         out = StringIO()
  42.         gzip.GzipFile(self.name, mode='wb', fileobj=out).write(value)
  43.         self.gzipvalue = out.getvalue()
  44.         self.legacy_zlib = False
  45.  
  46.     def get_value(self):
  47.         '''Return uncompressed value.'''
  48.  
  49.         if not self.gzipvalue:
  50.             return None
  51.  
  52.         if self.legacy_zlib:
  53.             return zlib.decompress(self.gzipvalue)
  54.         return gzip.GzipFile(fileobj=StringIO(self.gzipvalue)).read()
  55.  
  56.     def write(self, file):
  57.         '''Write uncompressed value into given file-like object.'''
  58.  
  59.         assert self.gzipvalue
  60.  
  61.         if self.legacy_zlib:
  62.             file.write(zlib.decompress(self.gzipvalue))
  63.             return
  64.  
  65.         gz = gzip.GzipFile(fileobj=StringIO(self.gzipvalue))
  66.         while True:
  67.             block = gz.read(1048576)
  68.             if not block:
  69.                 break
  70.             file.write(block)
  71.  
  72.     def __len__(self):
  73.         '''Return length of uncompressed value.'''
  74.  
  75.         assert self.gzipvalue
  76.         if self.legacy_zlib:
  77.             return len(self.get_value())
  78.         return int(struct.unpack("<L", self.gzipvalue[-4:])[0])
  79.  
  80.     def splitlines(self):
  81.         '''Behaves like splitlines() for a normal string.'''
  82.  
  83.         return self.get_value().splitlines()
  84.  
  85. class ProblemReport(UserDict.IterableUserDict):
  86.     def __init__(self, type = 'Crash', date = None):
  87.         '''Initialize a fresh problem report.
  88.  
  89.         type can be 'Crash', 'Packaging', 'KernelCrash' or 'KernelOops'.
  90.         date is the desired date/time string; if None (default), the
  91.         current local time is used. '''
  92.  
  93.         if date == None:
  94.             date = time.asctime()
  95.         self.data = {'ProblemType': type, 'Date': date}
  96.  
  97.         # keeps track of keys which were added since the last ctor or load()
  98.         self.old_keys = set()
  99.  
  100.     def load(self, file, binary=True):
  101.         '''Initialize problem report from a file-like object.
  102.         
  103.         If binary is False, binary data is not loaded; the dictionary key is
  104.         created, but its value will be an empty string. If it is true, it is
  105.         transparently uncompressed and available as dictionary string values.
  106.         If binary is 'compressed', the compressed value is retained, and the
  107.         dictionary value will be a CompressedValue object. This is useful if
  108.         the compressed value is still useful (to avoid recompression if the
  109.         file needs to be written back).
  110.  
  111.         Files are in RFC822 format.
  112.         '''
  113.         self.data.clear()
  114.         key = None
  115.         value = None
  116.         b64_block = False
  117.         bd = None
  118.         for line in file:
  119.             # continuation line
  120.             if line.startswith(' '):
  121.                 if b64_block and not binary:
  122.                     continue
  123.                 assert (key != None and value != None)
  124.                 if b64_block:
  125.                     l = base64.b64decode(line)
  126.                     if bd:
  127.                         value += bd.decompress(l)
  128.                     else:
  129.                         if binary == 'compressed':
  130.                             # check gzip header; if absent, we have legacy zlib
  131.                             # data
  132.                             if value.gzipvalue == '' and not l.startswith('\037\213\010'): 
  133.                                 value.legacy_zlib = True
  134.                             value.gzipvalue += l
  135.                         else:
  136.                             # lazy initialization of bd
  137.                             # skip gzip header, if present
  138.                             if l.startswith('\037\213\010'): 
  139.                                 bd = zlib.decompressobj(-zlib.MAX_WBITS)
  140.                                 value = bd.decompress(self._strip_gzip_header(l))
  141.                             else:
  142.                                 # legacy zlib-only format used default block
  143.                                 # size
  144.                                 bd = zlib.decompressobj()
  145.                                 value += bd.decompress(l)
  146.                 else:
  147.                     if len(value) > 0:
  148.                         value += '\n'
  149.                     value += line[1:-1]
  150.             else:
  151.                 if b64_block:
  152.                     if bd:
  153.                         value += bd.flush()
  154.                     b64_block = False
  155.                     bd = None
  156.                 if key:
  157.                     assert value != None
  158.                     self.data[key] = value
  159.                 (key, value) = line.split(':', 1)
  160.                 value = value.strip()
  161.                 if value == 'base64':
  162.                     if binary == 'compressed':
  163.                         value = CompressedValue(key)
  164.                         value.gzipvalue = ''
  165.                     else:
  166.                         value = ''
  167.                     b64_block = True
  168.  
  169.         if key != None:
  170.             self.data[key] = value
  171.  
  172.         self.old_keys = set(self.data.keys())
  173.  
  174.     def has_removed_fields(self):
  175.         '''Check whether the report has any keys which were not loaded in load()
  176.         due to being compressed binary.'''
  177.  
  178.         return ('' in self.itervalues())
  179.  
  180.     def _is_binary(self, string):
  181.         '''Check if the given strings contains binary data.'''
  182.  
  183.         for c in string:
  184.             if c < ' ' and not c.isspace():
  185.                 return True
  186.         return False
  187.  
  188.     def write(self, file, only_new = False):
  189.         '''Write information into the given file-like object.
  190.  
  191.         If only_new is True, only keys which have been added since the last
  192.         load() are written (i. e. those returned by new_keys()).
  193.  
  194.         If a value is a string, it is written directly. Otherwise it must be a
  195.         tuple of the form (file, encode=True, limit=None, fail_on_empty=False).
  196.         The first argument can be a file name or a file-like object,
  197.         which will be read and its content will become the value of this key.
  198.         'encode' specifies whether the contents will be
  199.         gzip compressed and base64-encoded (this defaults to True). If limit is
  200.         set to a positive integer, the entire key will be removed. If
  201.         fail_on_empty is True, reading zero bytes will cause an IOError.
  202.  
  203.         Files are written in RFC822 format.
  204.         '''
  205.         # sort keys into ASCII non-ASCII/binary attachment ones, so that
  206.         # the base64 ones appear last in the report
  207.         asckeys = []
  208.         binkeys = []
  209.         for k in self.data.keys():
  210.             if only_new and k in self.old_keys:
  211.                 continue
  212.             v = self.data[k]
  213.             if hasattr(v, 'find'):
  214.                 if self._is_binary(v):
  215.                     binkeys.append(k)
  216.                 else:
  217.                     asckeys.append(k)
  218.             else:
  219.                 if not isinstance(v, CompressedValue) and len(v) >= 2 and not v[1]: # force uncompressed
  220.                     asckeys.append(k)
  221.                 else:
  222.                     binkeys.append(k)
  223.  
  224.         asckeys.sort()
  225.         if 'ProblemType' in asckeys:
  226.             asckeys.remove('ProblemType')
  227.             asckeys.insert(0, 'ProblemType')
  228.         binkeys.sort()
  229.  
  230.         # write the ASCII keys first
  231.         for k in asckeys:
  232.             v = self.data[k]
  233.  
  234.             # if it's a tuple, we have a file reference; read the contents
  235.             if not hasattr(v, 'find'):
  236.                 if len(v) >= 3 and v[2] != None:
  237.                     limit = v[2]
  238.                 else:
  239.                     limit = None
  240.  
  241.                 fail_on_empty = len(v) >= 4 and v[3]
  242.  
  243.                 if hasattr(v[0], 'read'):
  244.                     v = v[0].read() # file-like object
  245.                 else:
  246.                     v = open(v[0]).read() # file name
  247.  
  248.                 if fail_on_empty and len(v) == 0:
  249.                     raise IOError, 'did not get any data for field ' + k
  250.  
  251.                 if limit != None and len(v) > limit:
  252.                     del self.data[k]
  253.                     continue
  254.  
  255.             if type(v) == type(u''):
  256.                 # unicode ‚Üí str
  257.                 v = v.encode('UTF-8')
  258.  
  259.             if '\n' in v:
  260.                 # multiline value
  261.                 print >> file, k + ':'
  262.                 print >> file, '', v.replace('\n', '\n ')
  263.             else:
  264.                 # single line value
  265.                 print >> file, k + ':', v
  266.  
  267.         # now write the binary keys with gzip compression and base64 encoding
  268.         for k in binkeys:
  269.             v = self.data[k]
  270.             limit = None
  271.             size = 0
  272.  
  273.             curr_pos = file.tell()
  274.             file.write (k + ': base64\n ')
  275.  
  276.             # CompressedValue
  277.             if isinstance(v, CompressedValue):
  278.                 file.write(base64.b64encode(v.gzipvalue))
  279.                 file.write('\n')
  280.                 continue
  281.  
  282.             # write gzip header
  283.             gzip_header = '\037\213\010\010\000\000\000\000\002\377' + k + '\000'
  284.             file.write(base64.b64encode(gzip_header))
  285.             file.write('\n ')
  286.             crc = zlib.crc32('')
  287.  
  288.             bc = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS,
  289.                 zlib.DEF_MEM_LEVEL, 0)
  290.             # direct value
  291.             if hasattr(v, 'find'):
  292.                 size += len(v)
  293.                 crc = zlib.crc32(v, crc)
  294.                 outblock = bc.compress(v)
  295.                 if outblock:
  296.                     file.write(base64.b64encode(outblock))
  297.                     file.write('\n ')
  298.             # file reference
  299.             else:
  300.                 if len(v) >= 3 and v[2] != None:
  301.                     limit = v[2]
  302.  
  303.                 if hasattr(v[0], 'read'):
  304.                     f = v[0] # file-like object
  305.                 else:
  306.                     f = open(v[0]) # file name
  307.                 while True:
  308.                     block = f.read(1048576)
  309.                     size += len(block)
  310.                     crc = zlib.crc32(block, crc)
  311.                     if limit != None:
  312.                         if size > limit:
  313.                             # roll back
  314.                             file.seek(curr_pos)
  315.                             file.truncate(curr_pos)
  316.                             del self.data[k]
  317.                             crc = None
  318.                             break
  319.                     if block:
  320.                         outblock = bc.compress(block)
  321.                         if outblock:
  322.                             file.write(base64.b64encode(outblock))
  323.                             file.write('\n ')
  324.                     else:
  325.                         break
  326.  
  327.                 if len(v) >= 4 and v[3]:
  328.                     if size == 0:
  329.                         raise IOError, 'did not get any data for field %s from %s' % (k, str(v[0]))
  330.  
  331.             # flush compressor and write the rest
  332.             if not limit or size <= limit:
  333.                 block = bc.flush()
  334.                 # append gzip trailer: crc (32 bit) and size (32 bit)
  335.                 if crc:
  336.                     block += struct.pack("<L", crc & 0xFFFFFFFFL)
  337.                     block += struct.pack("<L", size & 0xFFFFFFFFL)
  338.  
  339.                 file.write(base64.b64encode(block))
  340.                 file.write('\n')
  341.  
  342.     def add_to_existing(self, reportfile, keep_times=False):
  343.         '''Add the fields of this report to an already existing report
  344.         file.
  345.  
  346.         The file will be temporarily chmod'ed to 000 to prevent frontends
  347.         from picking up a hal-updated report file. If keep_times
  348.         is True, then the file's atime and mtime restored after updating.'''
  349.  
  350.         st = os.stat(reportfile)
  351.         try:
  352.             f = open(reportfile, 'a')
  353.             os.chmod(reportfile, 0)
  354.             self.write(f)
  355.             f.close()
  356.         finally:
  357.             if keep_times:
  358.                 os.utime(reportfile, (st.st_atime, st.st_mtime))
  359.             os.chmod(reportfile, st.st_mode)
  360.  
  361.     def write_mime(self, file, attach_treshold = 5, extra_headers={},
  362.         skip_keys=None):
  363.         '''Write information into the given file-like object, using
  364.         MIME/Multipart RFC 2822 format (i. e. an email with attachments).
  365.  
  366.         If a value is a string or a CompressedValue, it is written directly.
  367.         Otherwise it must be a tuple containing the source file and an optional
  368.         boolean value (in that order); the first argument can be a file name or
  369.         a file-like object, which will be read and its content will become the
  370.         value of this key.  The file will be gzip compressed, unless the key
  371.         already ends in .gz.
  372.  
  373.         attach_treshold specifies the maximum number of lines for a value to be
  374.         included into the first inline text part. All bigger values (as well as
  375.         all non-ASCII ones) will become an attachment.
  376.  
  377.         Extra MIME preamble headers can be specified, too, as a dictionary.
  378.  
  379.         skip_keys is a set/list specifying keys which are filtered out and not
  380.         written to the destination file.
  381.         '''
  382.  
  383.         keys = self.data.keys()
  384.         keys.sort()
  385.  
  386.         text = ''
  387.         attachments = []
  388.  
  389.         if 'ProblemType' in keys:
  390.             keys.remove('ProblemType')
  391.             keys.insert(0, 'ProblemType')
  392.  
  393.         for k in keys:
  394.             if skip_keys and k in skip_keys:
  395.                 continue
  396.             v = self.data[k]
  397.             attach_value = None
  398.  
  399.             # compressed values are ready for attaching in gzip form
  400.             if isinstance(v, CompressedValue):
  401.                 attach_value = v.gzipvalue
  402.  
  403.             # if it's a tuple, we have a file reference; read the contents
  404.             # and gzip it
  405.             elif not hasattr(v, 'find'):
  406.                 attach_value = ''
  407.                 if hasattr(v[0], 'read'):
  408.                     f = v[0] # file-like object
  409.                 else:
  410.                     f = open(v[0]) # file name
  411.                 if k.endswith('.gz'):
  412.                     attach_value = f.read()
  413.                 else:
  414.                     io = StringIO()
  415.                     gf = gzip.GzipFile(k, mode='wb', fileobj=io)
  416.                     while True:
  417.                         block = f.read(1048576)
  418.                         if block:
  419.                             gf.write(block)
  420.                         else:
  421.                             gf.close()
  422.                             break
  423.                     attach_value = io.getvalue()
  424.                 f.close()
  425.  
  426.             # binary value
  427.             elif self._is_binary(v):
  428.                 if k.endswith('.gz'):
  429.                     attach_value = v
  430.                 else:
  431.                     attach_value = CompressedValue(v, k).gzipvalue
  432.  
  433.             # if we have an attachment value, create an attachment
  434.             if attach_value:
  435.                 att = MIMEBase('application', 'x-gzip')
  436.                 if k.endswith('.gz'):
  437.                     att.add_header('Content-Disposition', 'attachment', filename=k)
  438.                 else:
  439.                     att.add_header('Content-Disposition', 'attachment', filename=k+'.gz')
  440.                 att.set_payload(attach_value)
  441.                 encode_base64(att)
  442.                 attachments.append(att)
  443.             else:
  444.                 # plain text value
  445.                 if type(v) == type(u''):
  446.                     # convert unicode to UTF-8 str
  447.                     v = v.encode('UTF-8')
  448.  
  449.                 lines = len(v.splitlines())
  450.                 if lines == 1:
  451.                     v = v.rstrip()
  452.                     text += '%s: %s\n' % (k, v)
  453.                 elif lines <= attach_treshold:
  454.                     text += '%s:\n ' % k
  455.                     if not v.endswith('\n'):
  456.                         v += '\n'
  457.                     text += v.strip().replace('\n', '\n ') + '\n'
  458.                 else:
  459.                     # too large, separate attachment
  460.                     att = MIMEText(v, _charset='UTF-8')
  461.                     att.add_header('Content-Disposition', 'attachment', filename=k+'.txt')
  462.                     attachments.append(att)
  463.  
  464.         # create initial text attachment
  465.         att = MIMEText(text, _charset='UTF-8')
  466.         att.add_header('Content-Disposition', 'inline')
  467.         attachments.insert(0, att)
  468.  
  469.         msg = MIMEMultipart()
  470.         for k, v in extra_headers.iteritems():
  471.             msg.add_header(k, v)
  472.         for a in attachments:
  473.             msg.attach(a)
  474.  
  475.         print >> file, msg.as_string()
  476.  
  477.     def __setitem__(self, k, v):
  478.         assert hasattr(k, 'isalnum')
  479.         assert k.replace('.', '').isalnum()
  480.         # value must be a string or a CompressedValue or a file reference
  481.         # (tuple (string|file [, bool]))
  482.         assert (isinstance(v, CompressedValue) or hasattr(v, 'isalnum') or
  483.             (hasattr(v, '__getitem__') and (
  484.             len(v) == 1 or (len(v) >= 2 and v[1] in (True, False)))
  485.             and (hasattr(v[0], 'isalnum') or hasattr(v[0], 'read'))))
  486.  
  487.         return self.data.__setitem__(k, v)
  488.  
  489.     def new_keys(self):
  490.         '''Return the set of keys which have been added to the report since it
  491.         was constructed or loaded.'''
  492.  
  493.         return set(self.data.keys()) - self.old_keys
  494.  
  495.     @classmethod
  496.     def _strip_gzip_header(klass, line):
  497.         '''Strip gzip header from line and return the rest.'''
  498.  
  499.         flags = ord(line[3])
  500.         offset = 10
  501.         if flags & 4: # FLG.FEXTRA
  502.             offset += line[offset] + 1
  503.         if flags & 8: # FLG.FNAME
  504.             while ord(line[offset]) != 0:
  505.                 offset += 1
  506.             offset += 1
  507.         if flags & 16: # FLG.FCOMMENT
  508.             while ord(line[offset]) != 0:
  509.                 offset += 1
  510.             offset += 1
  511.         if flags & 2: # FLG.FHCRC
  512.             offset += 2
  513.  
  514.         return line[offset:]
  515.  
  516. #
  517. # Unit test
  518. #
  519.  
  520. import unittest, tempfile, os, email
  521.  
  522. class _ProblemReportTest(unittest.TestCase):
  523.     def test_basic_operations(self):
  524.         '''basic creation and operation.'''
  525.  
  526.         pr = ProblemReport()
  527.         pr['foo'] = 'bar'
  528.         pr['bar'] = ' foo   bar\nbaz\n   blip  '
  529.         self.assertEqual(pr['foo'], 'bar')
  530.         self.assertEqual(pr['bar'], ' foo   bar\nbaz\n   blip  ')
  531.         self.assertEqual(pr['ProblemType'], 'Crash')
  532.         self.assert_(time.strptime(pr['Date']))
  533.  
  534.     def test_ctor_arguments(self):
  535.         '''non-default constructor arguments.'''
  536.  
  537.         pr = ProblemReport('KernelCrash')
  538.         self.assertEqual(pr['ProblemType'], 'KernelCrash')
  539.         pr = ProblemReport(date = '19801224 12:34')
  540.         self.assertEqual(pr['Date'], '19801224 12:34')
  541.  
  542.     def test_sanity_checks(self):
  543.         '''various error conditions.'''
  544.  
  545.         pr = ProblemReport()
  546.         self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
  547.         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  548.         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  549.         self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
  550.         self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
  551.         self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
  552.  
  553.     def test_compressed_values(self):
  554.         '''handling of CompressedValue values.'''
  555.  
  556.         large_val = 'A' * 5000000
  557.  
  558.         pr = ProblemReport()
  559.         pr['Foo'] = CompressedValue('FooFoo!')
  560.         pr['Bin'] = CompressedValue()
  561.         pr['Bin'].set_value('AB' * 10 + '\0' * 10 + 'Z')
  562.         pr['Large'] = CompressedValue(large_val)
  563.  
  564.         self.assert_(isinstance(pr['Foo'], CompressedValue))
  565.         self.assert_(isinstance(pr['Bin'], CompressedValue))
  566.         self.assertEqual(pr['Foo'].get_value(), 'FooFoo!')
  567.         self.assertEqual(pr['Bin'].get_value(), 'AB' * 10 + '\0' * 10 + 'Z')
  568.         self.assertEqual(pr['Large'].get_value(), large_val)
  569.         self.assertEqual(len(pr['Foo']), 7)
  570.         self.assertEqual(len(pr['Bin']), 31)
  571.         self.assertEqual(len(pr['Large']), len(large_val))
  572.  
  573.         io = StringIO()
  574.         pr['Bin'].write(io)
  575.         self.assertEqual(io.getvalue(), 'AB' * 10 + '\0' * 10 + 'Z')
  576.         io = StringIO()
  577.         pr['Large'].write(io)
  578.         self.assertEqual(io.getvalue(), large_val)
  579.  
  580.         pr['Multiline'] = CompressedValue('\1\1\1\n\2\2\n\3\3\3')
  581.         self.assertEqual(pr['Multiline'].splitlines(), 
  582.             ['\1\1\1', '\2\2', '\3\3\3'])
  583.  
  584.         # test writing of reports with CompressedValues
  585.         io = StringIO()
  586.         pr.write(io)
  587.         io.seek(0)
  588.         pr = ProblemReport()
  589.         pr.load(io)
  590.         self.assertEqual(pr['Foo'], 'FooFoo!')
  591.         self.assertEqual(pr['Bin'], 'AB' * 10 + '\0' * 10 + 'Z')
  592.         self.assertEqual(pr['Large'], large_val)
  593.  
  594.     def test_write(self):
  595.         '''write() and proper formatting.'''
  596.  
  597.         pr = ProblemReport(date = 'now!')
  598.         pr['Simple'] = 'bar'
  599.         pr['SimpleUTF8'] = '1√§√∂2Œ¶3'
  600.         pr['SimpleUnicode'] = u'1√§√∂2Œ¶3'
  601.         pr['TwoLineUTF8'] = 'pi-œÄ\nnu-Œ∑'
  602.         pr['TwoLineUnicode'] = u'pi-œÄ\nnu-Œ∑'
  603.         pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  \n\nafteremptyline'
  604.         io = StringIO()
  605.         pr.write(io)
  606.         self.assertEqual(io.getvalue(), 
  607. '''ProblemType: Crash
  608. Date: now!
  609. Simple: bar
  610. SimpleUTF8: 1√§√∂2Œ¶3
  611. SimpleUnicode: 1√§√∂2Œ¶3
  612. TwoLineUTF8:
  613.  pi-œÄ
  614.  nu-Œ∑
  615. TwoLineUnicode:
  616.  pi-œÄ
  617.  nu-Œ∑
  618. WhiteSpace:
  619.   foo   bar
  620.  baz
  621.    blip  
  622.  
  623.  afteremptyline
  624. ''')
  625.  
  626.     def test_write_append(self):
  627.         '''write() with appending to an existing file.'''
  628.  
  629.         pr = ProblemReport(date = 'now!')
  630.         pr['Simple'] = 'bar'
  631.         pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  '
  632.         io = StringIO()
  633.         pr.write(io)
  634.  
  635.         pr.clear()
  636.         pr['Extra'] = 'appended'
  637.         pr.write(io)
  638.  
  639.         self.assertEqual(io.getvalue(), 
  640. '''ProblemType: Crash
  641. Date: now!
  642. Simple: bar
  643. WhiteSpace:
  644.   foo   bar
  645.  baz
  646.    blip  
  647. Extra: appended
  648. ''')
  649.  
  650.         temp = tempfile.NamedTemporaryFile()
  651.         temp.write('AB' * 10 + '\0' * 10 + 'Z')
  652.         temp.flush()
  653.  
  654.         pr = ProblemReport(date = 'now!')
  655.         pr['File'] = (temp.name,)
  656.         io = StringIO()
  657.         pr.write(io)
  658.         temp.close()
  659.  
  660.         pr.clear()
  661.         pr['Extra'] = 'appended'
  662.         pr.write(io)
  663.  
  664.         io.seek(0)
  665.         pr = ProblemReport()
  666.         pr.load(io)
  667.  
  668.         self.assertEqual(pr['Date'], 'now!')
  669.         self.assertEqual(pr['File'], 'AB' * 10 + '\0' * 10 + 'Z')
  670.         self.assertEqual(pr['Extra'], 'appended')
  671.  
  672.     def test_load(self):
  673.         '''load() with various formatting.'''
  674.         pr = ProblemReport()
  675.         pr.load(StringIO(
  676. '''ProblemType: Crash
  677. Date: now!
  678. Simple: bar
  679. WhiteSpace:
  680.   foo   bar
  681.  baz
  682.    blip  
  683. '''))
  684.         self.assertEqual(pr['ProblemType'], 'Crash')
  685.         self.assertEqual(pr['Date'], 'now!')
  686.         self.assertEqual(pr['Simple'], 'bar')
  687.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  688.  
  689.         # test last field a bit more
  690.         pr.load(StringIO(
  691. '''ProblemType: Crash
  692. Date: now!
  693. Simple: bar
  694. WhiteSpace:
  695.   foo   bar
  696.  baz
  697.    blip  
  698.  
  699. '''))
  700.         self.assertEqual(pr['ProblemType'], 'Crash')
  701.         self.assertEqual(pr['Date'], 'now!')
  702.         self.assertEqual(pr['Simple'], 'bar')
  703.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  \n')
  704.  
  705.         pr = ProblemReport()
  706.         pr.load(StringIO(
  707. '''ProblemType: Crash
  708. WhiteSpace:
  709.   foo   bar
  710.  baz
  711.  
  712.    blip  
  713. Last: foo
  714. '''))
  715.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n\n  blip  ')
  716.         self.assertEqual(pr['Last'], 'foo')
  717.  
  718.         pr.load(StringIO(
  719. '''ProblemType: Crash
  720. WhiteSpace:
  721.   foo   bar
  722.  baz
  723.    blip  
  724. Last: foo
  725.  
  726. '''))
  727.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  728.         self.assertEqual(pr['Last'], 'foo\n')
  729.  
  730.         # empty lines in values must have a leading space in coding
  731.         invalid_spacing = StringIO('''WhiteSpace:
  732.  first
  733.  
  734.  second
  735. ''')
  736.         pr = ProblemReport()
  737.         self.assertRaises(ValueError, pr.load, invalid_spacing)
  738.  
  739.         # test that load() cleans up properly
  740.         pr.load(StringIO('ProblemType: Crash'))
  741.         self.assertEqual(pr.keys(), ['ProblemType'])
  742.  
  743.     def test_write_file(self):
  744.         '''writing a report with binary file data.'''
  745.  
  746.         temp = tempfile.NamedTemporaryFile()
  747.         temp.write('AB' * 10 + '\0' * 10 + 'Z')
  748.         temp.flush()
  749.  
  750.         pr = ProblemReport(date = 'now!')
  751.         pr['File'] = (temp.name,)
  752.         pr['Afile'] = (temp.name,)
  753.         io = StringIO()
  754.         pr.write(io)
  755.         temp.close()
  756.  
  757.         self.assertEqual(io.getvalue(),
  758. '''ProblemType: Crash
  759. Date: now!
  760. Afile: base64
  761.  H4sICAAAAAAC/0FmaWxlAA==
  762.  c3RyxIAMcBAFAK/2p9MfAAAA
  763. File: base64
  764.  H4sICAAAAAAC/0ZpbGUA
  765.  c3RyxIAMcBAFAK/2p9MfAAAA
  766. ''')
  767.  
  768.         # force compression/encoding bool
  769.         temp = tempfile.NamedTemporaryFile()
  770.         temp.write('foo\0bar')
  771.         temp.flush()
  772.         pr = ProblemReport(date = 'now!')
  773.         pr['File'] = (temp.name, False)
  774.         io = StringIO()
  775.         pr.write(io)
  776.  
  777.         self.assertEqual(io.getvalue(),
  778. '''ProblemType: Crash
  779. Date: now!
  780. File: foo\0bar
  781. ''')
  782.  
  783.         pr['File'] = (temp.name, True)
  784.         io = StringIO()
  785.         pr.write(io)
  786.  
  787.         self.assertEqual(io.getvalue(),
  788. '''ProblemType: Crash
  789. Date: now!
  790. File: base64
  791.  H4sICAAAAAAC/0ZpbGUA
  792.  S8vPZ0hKLAIACq50HgcAAAA=
  793. ''')
  794.         temp.close()
  795.  
  796.     def test_write_fileobj(self):
  797.         '''writing a report with a pointer to a file-like object.'''
  798.  
  799.         tempbin = StringIO('AB' * 10 + '\0' * 10 + 'Z')
  800.         tempasc = StringIO('Hello World')
  801.  
  802.         pr = ProblemReport(date = 'now!')
  803.         pr['BinFile'] = (tempbin,)
  804.         pr['AscFile'] = (tempasc, False)
  805.         io = StringIO()
  806.         pr.write(io)
  807.         io.seek(0)
  808.  
  809.         pr = ProblemReport()
  810.         pr.load(io)
  811.         self.assertEqual(pr['BinFile'], tempbin.getvalue())
  812.         self.assertEqual(pr['AscFile'], tempasc.getvalue())
  813.  
  814.     def test_write_empty_fileobj(self):
  815.         '''writing a report with a pointer to a file-like object with enforcing non-emptyness.'''
  816.  
  817.         tempbin = StringIO('')
  818.         tempasc = StringIO('')
  819.  
  820.         pr = ProblemReport(date = 'now!')
  821.         pr['BinFile'] = (tempbin, True, None, True)
  822.         io = StringIO()
  823.         self.assertRaises(IOError, pr.write, io)
  824.  
  825.         pr = ProblemReport(date = 'now!')
  826.         pr['AscFile'] = (tempasc, False, None, True)
  827.         io = StringIO()
  828.         self.assertRaises(IOError, pr.write, io)
  829.  
  830.     def test_write_delayed_fileobj(self):
  831.         '''writing a report with file pointers and delayed data.'''
  832.  
  833.         (fout, fin) = os.pipe()
  834.  
  835.         if os.fork() == 0:
  836.             os.close(fout)
  837.             time.sleep(0.3)
  838.             os.write(fin, 'ab' * 512*1024)
  839.             time.sleep(0.3)
  840.             os.write(fin, 'hello')
  841.             time.sleep(0.3)
  842.             os.write(fin, ' world')
  843.             os.close(fin)
  844.             os._exit(0)
  845.  
  846.         os.close(fin)
  847.  
  848.         pr = ProblemReport(date = 'now!')
  849.         pr['BinFile'] = (os.fdopen(fout),)
  850.         io = StringIO()
  851.         pr.write(io)
  852.         assert os.wait()[1] == 0
  853.  
  854.         io.seek(0)
  855.  
  856.         pr2 = ProblemReport()
  857.         pr2.load(io)
  858.         self.assert_(pr2['BinFile'].endswith('abhello world'))
  859.         self.assertEqual(len(pr2['BinFile']), 1048576 + len('hello world'))
  860.  
  861.     def test_read_file(self):
  862.         '''reading a report with binary data.'''
  863.  
  864.         bin_report = '''ProblemType: Crash
  865. Date: now!
  866. File: base64
  867.  H4sICAAAAAAC/0ZpbGUA
  868.  c3RyxIAMcBAFAK/2p9MfAAAA
  869. Foo: Bar
  870. '''
  871.  
  872.         # test with reading everything
  873.         pr = ProblemReport()
  874.         pr.load(StringIO(bin_report))
  875.         self.assertEqual(pr['File'], 'AB' * 10 + '\0' * 10 + 'Z')
  876.         self.assertEqual(pr.has_removed_fields(), False)
  877.  
  878.         # test with skipping binary data
  879.         pr.load(StringIO(bin_report), binary=False)
  880.         self.assertEqual(pr['File'], '')
  881.         self.assertEqual(pr.has_removed_fields(), True)
  882.  
  883.         # test with keeping compressed binary data
  884.         pr.load(StringIO(bin_report), binary='compressed')
  885.         self.assertEqual(pr['Foo'], 'Bar')
  886.         self.assertEqual(pr.has_removed_fields(), False)
  887.         self.assert_(isinstance(pr['File'], CompressedValue))
  888.  
  889.         self.assertEqual(pr['File'].get_value(), 'AB' * 10 + '\0' * 10 + 'Z')
  890.  
  891.     def test_read_file_legacy(self):
  892.         '''reading a report with binary data in legacy format without gzip
  893.         header.'''
  894.  
  895.         bin_report = '''ProblemType: Crash
  896. Date: now!
  897. File: base64
  898.  eJw=
  899.  c3RyxIAMcBAFAG55BXk=
  900. Foo: Bar
  901. '''
  902.  
  903.         data = 'AB' * 10 + '\0' * 10 + 'Z'
  904.  
  905.         # test with reading everything
  906.         pr = ProblemReport()
  907.         pr.load(StringIO(bin_report))
  908.         self.assertEqual(pr['File'], data)
  909.         self.assertEqual(pr.has_removed_fields(), False)
  910.  
  911.         # test with skipping binary data
  912.         pr.load(StringIO(bin_report), binary=False)
  913.         self.assertEqual(pr['File'], '')
  914.         self.assertEqual(pr.has_removed_fields(), True)
  915.  
  916.         # test with keeping CompressedValues
  917.         pr.load(StringIO(bin_report), binary='compressed')
  918.         self.assertEqual(pr.has_removed_fields(), False)
  919.         self.assertEqual(len(pr['File']), len(data))
  920.         self.assertEqual(pr['File'].get_value(), data)
  921.         io = StringIO()
  922.         pr['File'].write(io)
  923.         io.seek(0)
  924.         self.assertEqual(io.read(), data)
  925.  
  926.     def test_big_file(self):
  927.         '''writing and re-decoding a big random file.'''
  928.  
  929.         # create 1 MB random file
  930.         temp = tempfile.NamedTemporaryFile()
  931.         data = os.urandom(1048576)
  932.         temp.write(data)
  933.         temp.flush()
  934.  
  935.         # write it into problem report
  936.         pr = ProblemReport()
  937.         pr['File'] = (temp.name,)
  938.         pr['Before'] = 'xtestx'
  939.         pr['ZAfter'] = 'ytesty'
  940.         io = StringIO()
  941.         pr.write(io)
  942.         temp.close()
  943.  
  944.         # read it again
  945.         io.seek(0)
  946.         pr = ProblemReport()
  947.         pr.load(io)
  948.  
  949.         self.assert_(pr['File'] == data)
  950.         self.assertEqual(pr['Before'], 'xtestx')
  951.         self.assertEqual(pr['ZAfter'], 'ytesty')
  952.  
  953.         # write it again
  954.         io2 = StringIO()
  955.         pr.write(io2)
  956.         self.assert_(io.getvalue() == io2.getvalue())
  957.  
  958.         # check gzip compatibility
  959.         io.seek(0)
  960.         pr = ProblemReport()
  961.         pr.load(io, binary='compressed')
  962.         self.assertEqual(pr['File'].get_value(), data)
  963.  
  964.     def test_size_limit(self):
  965.         '''writing and a big random file with a size limit key.'''
  966.  
  967.         # create 1 MB random file
  968.         temp = tempfile.NamedTemporaryFile()
  969.         data = os.urandom(1048576)
  970.         temp.write(data)
  971.         temp.flush()
  972.  
  973.         # write it into problem report
  974.         pr = ProblemReport()
  975.         pr['FileSmallLimit'] = (temp.name, True, 100)
  976.         pr['FileLimitMinus1'] = (temp.name, True, 1048575)
  977.         pr['FileExactLimit'] = (temp.name, True, 1048576)
  978.         pr['FileLimitPlus1'] = (temp.name, True, 1048577)
  979.         pr['FileLimitNone'] = (temp.name, True, None)
  980.         pr['Before'] = 'xtestx'
  981.         pr['ZAfter'] = 'ytesty'
  982.         io = StringIO()
  983.         pr.write(io)
  984.         temp.close()
  985.  
  986.         # read it again
  987.         io.seek(0)
  988.         pr = ProblemReport()
  989.         pr.load(io)
  990.  
  991.         self.failIf(pr.has_key('FileSmallLimit'))
  992.         self.failIf(pr.has_key('FileLimitMinus1'))
  993.         self.assert_(pr['FileExactLimit'] == data)
  994.         self.assert_(pr['FileLimitPlus1'] == data)
  995.         self.assert_(pr['FileLimitNone'] == data)
  996.         self.assertEqual(pr['Before'], 'xtestx')
  997.         self.assertEqual(pr['ZAfter'], 'ytesty')
  998.  
  999.     def test_iter(self):
  1000.         '''ProblemReport iteration.'''
  1001.  
  1002.         pr = ProblemReport()
  1003.         pr['foo'] = 'bar'
  1004.  
  1005.         keys = []
  1006.         for k in pr:
  1007.             keys.append(k)
  1008.         keys.sort()
  1009.         self.assertEqual(' '.join(keys), 'Date ProblemType foo')
  1010.  
  1011.         self.assertEqual(len([k for k in pr if k != 'foo']), 2)
  1012.  
  1013.     def test_modify(self):
  1014.         '''reading, modifying fields, and writing back.'''
  1015.  
  1016.         report = '''ProblemType: Crash
  1017. Date: now!
  1018. Long:
  1019.  xxx
  1020.  .
  1021.  yyy
  1022. Short: Bar
  1023. File: base64
  1024.  H4sICAAAAAAC/0ZpbGUA
  1025.  c3RyxIAMcBAFAK/2p9MfAAAA
  1026. '''
  1027.  
  1028.         pr = ProblemReport()
  1029.         pr.load(StringIO(report))
  1030.  
  1031.         self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
  1032.  
  1033.         # write back unmodified
  1034.         io = StringIO()
  1035.         pr.write(io)
  1036.         self.assertEqual(io.getvalue(), report)
  1037.  
  1038.         pr['Short'] = 'aaa\nbbb'
  1039.         pr['Long'] = '123'
  1040.         io = StringIO()
  1041.         pr.write(io)
  1042.         self.assertEqual(io.getvalue(),
  1043. '''ProblemType: Crash
  1044. Date: now!
  1045. Long: 123
  1046. Short:
  1047.  aaa
  1048.  bbb
  1049. File: base64
  1050.  H4sICAAAAAAC/0ZpbGUA
  1051.  c3RyxIAMcBAFAK/2p9MfAAAA
  1052. ''')
  1053.  
  1054.     def test_add_to_existing(self):
  1055.         '''adding information to an existing report.'''
  1056.  
  1057.         # original report
  1058.         pr = ProblemReport()
  1059.         pr['old1'] = '11'
  1060.         pr['old2'] = '22'
  1061.  
  1062.         (fd, rep) = tempfile.mkstemp()
  1063.         os.close(fd)
  1064.         pr.write(open(rep, 'w'))
  1065.  
  1066.         origstat = os.stat(rep)
  1067.  
  1068.         # create a new one and add it
  1069.         pr = ProblemReport()
  1070.         pr.clear()
  1071.         pr['new1'] = '33'
  1072.  
  1073.         pr.add_to_existing(rep, keep_times=True)
  1074.  
  1075.         # check keep_times
  1076.         newstat = os.stat(rep)
  1077.         self.assertEqual(origstat.st_mode, newstat.st_mode)
  1078.         self.assertAlmostEqual(origstat.st_atime, newstat.st_atime, 1)
  1079.         self.assertAlmostEqual(origstat.st_mtime, newstat.st_mtime, 1)
  1080.  
  1081.         # check report contents
  1082.         newpr = ProblemReport()
  1083.         newpr.load(open(rep))
  1084.         self.assertEqual(newpr['old1'], '11')
  1085.         self.assertEqual(newpr['old2'], '22')
  1086.         self.assertEqual(newpr['new1'], '33')
  1087.  
  1088.         # create a another new one and add it, but make sure mtime must be
  1089.         # different
  1090.         time.sleep(1)
  1091.         open(rep).read() # bump atime
  1092.         time.sleep(1)
  1093.  
  1094.         pr = ProblemReport()
  1095.         pr.clear()
  1096.         pr['new2'] = '44'
  1097.  
  1098.         pr.add_to_existing(rep)
  1099.  
  1100.         # check that timestamps have been updates
  1101.         newstat = os.stat(rep)
  1102.         self.assertEqual(origstat.st_mode, newstat.st_mode)
  1103.         self.assertNotEqual(origstat.st_mtime, newstat.st_mtime)
  1104.         # skip atime check if filesystem is mounted noatime
  1105.         skip_atime = False
  1106.         dir = rep
  1107.         while len(dir)>1:
  1108.             dir, filename = os.path.split(dir)
  1109.             if os.path.ismount(dir):
  1110.                 for line in open('/proc/mounts'):
  1111.                     mount, fs, options = line.split(' ')[1:4]
  1112.                     if mount == dir and 'noatime' in options.split(','):
  1113.                         skip_atime = True
  1114.                         break
  1115.                 break
  1116.         if not skip_atime:
  1117.             self.assertNotEqual(origstat.st_atime, newstat.st_atime)
  1118.  
  1119.         # check report contents
  1120.         newpr = ProblemReport()
  1121.         newpr.load(open(rep))
  1122.         self.assertEqual(newpr['old1'], '11')
  1123.         self.assertEqual(newpr['old2'], '22')
  1124.         self.assertEqual(newpr['new1'], '33')
  1125.         self.assertEqual(newpr['new2'], '44')
  1126.  
  1127.         os.unlink(rep)
  1128.  
  1129.     def test_write_mime_text(self):
  1130.         '''write_mime() for text values.'''
  1131.  
  1132.         pr = ProblemReport(date = 'now!')
  1133.         pr['Simple'] = 'bar'
  1134.         pr['SimpleUTF8'] = '1√§√∂2Œ¶3'
  1135.         pr['SimpleUnicode'] = u'1√§√∂2Œ¶3'
  1136.         pr['SimpleLineEnd'] = 'bar\n'
  1137.         pr['TwoLine'] = 'first\nsecond\n'
  1138.         pr['TwoLineUTF8'] = 'pi-œÄ\nnu-Œ∑\n'
  1139.         pr['TwoLineUnicode'] = u'pi-œÄ\nnu-Œ∑\n'
  1140.         pr['InlineMargin'] = 'first\nsecond\nthird\nfourth\nfifth\n'
  1141.         pr['Multiline'] = ' foo   bar\nbaz\n  blip  \nline4\nline‚ô•5!!\n≈ǃ±¬µ‚Ǩ ‚Öù\n'
  1142.         io = StringIO()
  1143.         pr.write_mime(io)
  1144.         io.seek(0)
  1145.  
  1146.         msg = email.message_from_file(io)
  1147.         msg_iter = msg.walk()
  1148.  
  1149.         # first part is the multipart container
  1150.         part = msg_iter.next()
  1151.         self.assert_(part.is_multipart())
  1152.  
  1153.         # second part should be an inline text/plain attachments with all short
  1154.         # fields
  1155.         part = msg_iter.next()
  1156.         self.assert_(not part.is_multipart())
  1157.         self.assertEqual(part.get_content_type(), 'text/plain')
  1158.         self.assertEqual(part.get_content_charset(), 'utf-8')
  1159.         self.assertEqual(part.get_filename(), None)
  1160.         self.assertEqual(part.get_payload(decode=True), '''ProblemType: Crash
  1161. Date: now!
  1162. InlineMargin:
  1163.  first
  1164.  second
  1165.  third
  1166.  fourth
  1167.  fifth
  1168. Simple: bar
  1169. SimpleLineEnd: bar
  1170. SimpleUTF8: 1√§√∂2Œ¶3
  1171. SimpleUnicode: 1√§√∂2Œ¶3
  1172. TwoLine:
  1173.  first
  1174.  second
  1175. TwoLineUTF8:
  1176.  pi-œÄ
  1177.  nu-Œ∑
  1178. TwoLineUnicode:
  1179.  pi-œÄ
  1180.  nu-Œ∑
  1181. ''')
  1182.  
  1183.         # third part should be the Multiline: field as attachment
  1184.         part = msg_iter.next()
  1185.         self.assert_(not part.is_multipart())
  1186.         self.assertEqual(part.get_content_type(), 'text/plain')
  1187.         self.assertEqual(part.get_content_charset(), 'utf-8')
  1188.         self.assertEqual(part.get_filename(), 'Multiline.txt')
  1189.         self.assertEqual(part.get_payload(decode=True), ''' foo   bar
  1190. baz
  1191.   blip  
  1192. line4
  1193. line‚ô•5!!
  1194. ≈ǃ±¬µ‚Ǩ ‚Öù
  1195. ''')
  1196.  
  1197.         # no more parts
  1198.         self.assertRaises(StopIteration, msg_iter.next)
  1199.  
  1200.     def test_write_mime_binary(self):
  1201.         '''write_mime() for binary values and file references.'''
  1202.  
  1203.         bin_value = 'AB' * 10 + '\0' * 10 + 'Z'
  1204.  
  1205.         temp = tempfile.NamedTemporaryFile()
  1206.         temp.write(bin_value)
  1207.         temp.flush()
  1208.  
  1209.         tempgz = tempfile.NamedTemporaryFile()
  1210.         gz = gzip.GzipFile('File1', 'w', fileobj=tempgz)
  1211.         gz.write(bin_value)
  1212.         gz.close()
  1213.         tempgz.flush()
  1214.  
  1215.         pr = ProblemReport(date = 'now!')
  1216.         pr['Context'] = 'Test suite'
  1217.         pr['File1'] = (temp.name,)
  1218.         pr['File1.gz'] = (tempgz.name,)
  1219.         pr['Value1'] = bin_value
  1220.         pr['Value1.gz'] = open(tempgz.name).read()
  1221.         pr['ZValue'] = CompressedValue(bin_value)
  1222.         io = StringIO()
  1223.         pr.write_mime(io)
  1224.         io.seek(0)
  1225.  
  1226.         msg = email.message_from_file(io)
  1227.         msg_iter = msg.walk()
  1228.  
  1229.         # first part is the multipart container
  1230.         part = msg_iter.next()
  1231.         self.assert_(part.is_multipart())
  1232.  
  1233.         # second part should be an inline text/plain attachments with all short
  1234.         # fields
  1235.         part = msg_iter.next()
  1236.         self.assert_(not part.is_multipart())
  1237.         self.assertEqual(part.get_content_type(), 'text/plain')
  1238.         self.assertEqual(part.get_content_charset(), 'utf-8')
  1239.         self.assertEqual(part.get_filename(), None)
  1240.         self.assertEqual(part.get_payload(decode=True),
  1241.             'ProblemType: Crash\nContext: Test suite\nDate: now!\n')
  1242.  
  1243.         # third part should be the File1: file contents as gzip'ed attachment
  1244.         part = msg_iter.next()
  1245.         self.assert_(not part.is_multipart())
  1246.         self.assertEqual(part.get_content_type(), 'application/x-gzip')
  1247.         self.assertEqual(part.get_filename(), 'File1.gz')
  1248.         f = tempfile.TemporaryFile()
  1249.         f.write(part.get_payload(decode=True))
  1250.         f.seek(0)
  1251.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1252.  
  1253.         # fourth part should be the File1.gz: file contents as gzip'ed
  1254.         # attachment; write_mime() should not compress it again
  1255.         part = msg_iter.next()
  1256.         self.assert_(not part.is_multipart())
  1257.         self.assertEqual(part.get_content_type(), 'application/x-gzip')
  1258.         self.assertEqual(part.get_filename(), 'File1.gz')
  1259.         f = tempfile.TemporaryFile()
  1260.         f.write(part.get_payload(decode=True))
  1261.         f.seek(0)
  1262.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1263.  
  1264.         # fifth part should be the Value1: value as gzip'ed attachment
  1265.         part = msg_iter.next()
  1266.         self.assert_(not part.is_multipart())
  1267.         self.assertEqual(part.get_content_type(), 'application/x-gzip')
  1268.         self.assertEqual(part.get_filename(), 'Value1.gz')
  1269.         f = tempfile.TemporaryFile()
  1270.         f.write(part.get_payload(decode=True))
  1271.         f.seek(0)
  1272.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1273.  
  1274.         # sixth part should be the Value1: value as gzip'ed attachment;
  1275.         # write_mime should not compress it again
  1276.         part = msg_iter.next()
  1277.         self.assert_(not part.is_multipart())
  1278.         self.assertEqual(part.get_content_type(), 'application/x-gzip')
  1279.         self.assertEqual(part.get_filename(), 'Value1.gz')
  1280.         f = tempfile.TemporaryFile()
  1281.         f.write(part.get_payload(decode=True))
  1282.         f.seek(0)
  1283.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1284.  
  1285.         # seventh part should be the ZValue: value as gzip'ed attachment;
  1286.         # write_mime should not compress it again
  1287.         part = msg_iter.next()
  1288.         self.assert_(not part.is_multipart())
  1289.         self.assertEqual(part.get_content_type(), 'application/x-gzip')
  1290.         self.assertEqual(part.get_filename(), 'ZValue.gz')
  1291.         f = tempfile.TemporaryFile()
  1292.         f.write(part.get_payload(decode=True))
  1293.         f.seek(0)
  1294.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1295.  
  1296.         # no more parts
  1297.         self.assertRaises(StopIteration, msg_iter.next)
  1298.  
  1299.     def test_write_mime_extra_headers(self):
  1300.         '''write_mime() with extra headers.'''
  1301.  
  1302.         pr = ProblemReport(date = 'now!')
  1303.         pr['Simple'] = 'bar'
  1304.         pr['TwoLine'] = 'first\nsecond\n'
  1305.         io = StringIO()
  1306.         pr.write_mime(io, extra_headers={'Greeting': 'hello world', 
  1307.             'Foo': 'Bar'})
  1308.         io.seek(0)
  1309.  
  1310.         msg = email.message_from_file(io)
  1311.         self.assertEqual(msg['Greeting'], 'hello world')
  1312.         self.assertEqual(msg['Foo'], 'Bar')
  1313.         msg_iter = msg.walk()
  1314.  
  1315.         # first part is the multipart container
  1316.         part = msg_iter.next()
  1317.         self.assert_(part.is_multipart())
  1318.  
  1319.         # second part should be an inline text/plain attachments with all short
  1320.         # fields
  1321.         part = msg_iter.next()
  1322.         self.assert_(not part.is_multipart())
  1323.         self.assertEqual(part.get_content_type(), 'text/plain')
  1324.         self.assert_('Simple: bar' in part.get_payload(decode=True))
  1325.  
  1326.         # no more parts
  1327.         self.assertRaises(StopIteration, msg_iter.next)
  1328.  
  1329.     def test_write_mime_filter(self):
  1330.         '''write_mime() with key filters.'''
  1331.  
  1332.         bin_value = 'AB' * 10 + '\0' * 10 + 'Z'
  1333.  
  1334.         pr = ProblemReport(date = 'now!')
  1335.         pr['GoodText'] = 'Hi'
  1336.         pr['BadText'] = 'YouDontSeeMe'
  1337.         pr['GoodBin'] = bin_value
  1338.         pr['BadBin'] = 'Y' + '\x05' * 10 + '-'
  1339.         io = StringIO()
  1340.         pr.write_mime(io, skip_keys=['BadText', 'BadBin'])
  1341.         io.seek(0)
  1342.  
  1343.         msg = email.message_from_file(io)
  1344.         msg_iter = msg.walk()
  1345.  
  1346.         # first part is the multipart container
  1347.         part = msg_iter.next()
  1348.         self.assert_(part.is_multipart())
  1349.  
  1350.         # second part should be an inline text/plain attachments with all short
  1351.         # fields
  1352.         part = msg_iter.next()
  1353.         self.assert_(not part.is_multipart())
  1354.         self.assertEqual(part.get_content_type(), 'text/plain')
  1355.         self.assertEqual(part.get_content_charset(), 'utf-8')
  1356.         self.assertEqual(part.get_filename(), None)
  1357.         self.assertEqual(part.get_payload(decode=True), '''ProblemType: Crash
  1358. Date: now!
  1359. GoodText: Hi
  1360. ''')
  1361.  
  1362.         # third part should be the GoodBin: field as attachment
  1363.         part = msg_iter.next()
  1364.         self.assert_(not part.is_multipart())
  1365.         f = tempfile.TemporaryFile()
  1366.         f.write(part.get_payload(decode=True))
  1367.         f.seek(0)
  1368.         self.assertEqual(gzip.GzipFile(mode='rb', fileobj=f).read(), bin_value)
  1369.  
  1370.         # no more parts
  1371.         self.assertRaises(StopIteration, msg_iter.next)
  1372.  
  1373.     def test_updating(self):
  1374.         '''new_keys() and write() with only_new=True.'''
  1375.  
  1376.         pr = ProblemReport()
  1377.         self.assertEqual(pr.new_keys(), set(['ProblemType', 'Date']))
  1378.         pr.load(StringIO(
  1379. '''ProblemType: Crash
  1380. Date: now!
  1381. Foo: bar
  1382. Baz: blob
  1383. '''))
  1384.  
  1385.         self.assertEqual(pr.new_keys(), set())
  1386.  
  1387.         pr['Foo'] = 'changed'
  1388.         pr['NewKey'] = 'new new'
  1389.         self.assertEqual(pr.new_keys(), set(['NewKey']))
  1390.  
  1391.         out = StringIO()
  1392.         pr.write(out, only_new=True)
  1393.         self.assertEqual(out.getvalue(), 'NewKey: new new\n')
  1394.  
  1395.     def test_import_dict(self):
  1396.         '''importing a dictionary with update().'''
  1397.  
  1398.         pr = ProblemReport()
  1399.         pr['oldtext'] = 'Hello world'
  1400.         pr['oldbin'] = 'AB' * 10 + '\0' * 10 + 'Z'
  1401.         pr['overwrite'] = 'I am crap'
  1402.  
  1403.         d = {}
  1404.         d['newtext'] = 'Goodbye world'
  1405.         d['newbin'] = '11\000\001\002\xFFZZ'
  1406.         d['overwrite'] = 'I am good' 
  1407.  
  1408.         pr.update(d)
  1409.         self.assertEqual(pr['oldtext'], 'Hello world')
  1410.         self.assertEqual(pr['oldbin'], 'AB' * 10 + '\0' * 10 + 'Z')
  1411.         self.assertEqual(pr['newtext'], 'Goodbye world')
  1412.         self.assertEqual(pr['newbin'], '11\000\001\002\xFFZZ')
  1413.         self.assertEqual(pr['overwrite'], 'I am good')
  1414.  
  1415. if __name__ == '__main__':
  1416.     unittest.main()
  1417.